package com.bigdata.spark.streaming

import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.util.Date
import scala.collection.mutable.ListBuffer


object SparkStreaming11_Req31 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "sherry",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )


    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("sherry"), kafkaPara)
    )

    kafkaDataDS.map(_.value()).print()
    val adClickData = kafkaDataDS.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )

    // 最近一分钟，每10秒计算一次
    // 12:01 => 12:00
    // 12:11 => 12:10
    // 12:19 => 12:10
    // 12:25 => 12:20
    // 12:59 => 12:50

    // 55 => 50, 49 => 40, 32 => 30
    // 55 / 10 * 10 => 50
    // 49 / 10 * 10 => 40
    // 32 / 10 * 10 => 30

    // 这里涉及窗口的计算
    val reduceDS = adClickData.map(
      data => {
        val ts = data.ts.toLong
        val newTS = ts / 10000 * 10000
        (newTS, 1)
      }
    ).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(10))

    //reduceDS.print()
    reduceDS.foreachRDD(
      rdd => {
        val list = ListBuffer[String]()
        val datas: Array[(Long, Int)] = rdd.sortByKey(true).collect()
        datas.foreach{
          case (time ,cnt )=>{

            val timeString = new SimpleDateFormat("mm:ss").format(new Date(time.toLong))
            list.append(s""" { "xtime":"${timeString}", "yval":"${cnt}" }""")
          }
        }

        // 输出文件
        val out = new PrintWriter(new FileWriter(new File("datas/adclick/adclick.json")))

        out.println("["+list.mkString(",")+"]")
        out.flush()
        out.close()
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

  // 广告点击数据
  case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)

}
